package org.ektorp.impl.changes;

import cn.emagsoftware.sdk.f.b;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.NullNode;
import org.ektorp.changes.ChangesFeed;
import org.ektorp.changes.DocumentChange;
import org.ektorp.http.HttpResponse;
import org.ektorp.util.Exceptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: classes.dex */
public final class ContinuousChangesFeed implements ChangesFeed, Runnable {
    private final HttpResponse httpResponse;
    private final BufferedReader reader;
    private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousChangesFeed.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final DocumentChange INTERRUPT_MARKER = new StdDocumentChange(NullNode.getInstance());
    private static final Set<Class<?>> INTERRUPTED_EXCEPTION_TYPES = new HashSet();
    private final BlockingQueue<DocumentChange> changes = new LinkedBlockingQueue(100);
    private final Thread thread = new Thread(this);
    private volatile boolean shouldRun = true;

    static {
        INTERRUPTED_EXCEPTION_TYPES.add(InterruptedException.class);
        INTERRUPTED_EXCEPTION_TYPES.add(InterruptedIOException.class);
    }

    public ContinuousChangesFeed(String str, HttpResponse httpResponse) {
        this.httpResponse = httpResponse;
        try {
            this.reader = new BufferedReader(new InputStreamReader(httpResponse.getContent(), b.cC));
            this.thread.setName(String.format("ektorp-%s-changes-listening-thread-%s", str, Integer.valueOf(THREAD_COUNT.getAndIncrement())));
            this.thread.start();
        } catch (UnsupportedEncodingException e) {
            throw Exceptions.propagate(e);
        }
    }

    private void assertRunningState() {
        if (!isAlive()) {
            throw new IllegalStateException("Changes feed is not alive");
        }
    }

    private void checkIfInterrupted(DocumentChange documentChange) throws InterruptedException {
        if (documentChange == INTERRUPT_MARKER || (!this.shouldRun && this.changes.isEmpty())) {
            throw new InterruptedException();
        }
    }

    private void handleChange(String str) throws IOException, InterruptedException, JsonParseException, JsonMappingException {
        this.changes.put(new StdDocumentChange(OBJECT_MAPPER.readTree(str)));
    }

    private void handleException(Exception exc) {
        if (INTERRUPTED_EXCEPTION_TYPES.contains(exc.getClass())) {
            LOG.info("Changes feed was interrupted");
        } else {
            LOG.error("Caught exception while listening to changes feed:", (Throwable) exc);
        }
    }

    private void handleHeartbeat() {
        LOG.debug("Got heartbeat from DB");
    }

    private void sendInterruptMarker() {
        LOG.debug("Sending interrupt marker in order to interrupt feed consumer");
        this.changes.offer(INTERRUPT_MARKER);
    }

    @Override // org.ektorp.changes.ChangesFeed
    public void cancel() {
        LOG.debug("Feed cancelled");
        this.shouldRun = false;
        this.thread.interrupt();
    }

    @Override // org.ektorp.changes.ChangesFeed
    public boolean isAlive() {
        return this.thread.isAlive();
    }

    @Override // org.ektorp.changes.ChangesFeed
    public DocumentChange next() throws InterruptedException {
        assertRunningState();
        DocumentChange take = this.changes.take();
        checkIfInterrupted(take);
        return take;
    }

    @Override // org.ektorp.changes.ChangesFeed
    public DocumentChange next(long j, TimeUnit timeUnit) throws InterruptedException {
        assertRunningState();
        DocumentChange poll = this.changes.poll(j, timeUnit);
        checkIfInterrupted(poll);
        return poll;
    }

    @Override // org.ektorp.changes.ChangesFeed
    public DocumentChange poll() throws InterruptedException {
        assertRunningState();
        DocumentChange poll = this.changes.poll();
        checkIfInterrupted(poll);
        return poll;
    }

    @Override // org.ektorp.changes.ChangesFeed
    public int queueSize() {
        return this.changes.size();
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                String readLine = this.reader.readLine();
                while (this.shouldRun && readLine != null) {
                    if (readLine.length() > 0) {
                        handleChange(readLine);
                    } else {
                        handleHeartbeat();
                    }
                    readLine = this.reader.readLine();
                }
                LOG.info("Changes feed stopped. Reason: " + (!this.shouldRun ? "Cancelled" : "EOF"));
                sendInterruptMarker();
                this.httpResponse.abort();
                try {
                    this.reader.close();
                } catch (IOException e) {
                }
            } catch (Exception e2) {
                handleException(e2);
                sendInterruptMarker();
                this.httpResponse.abort();
                try {
                    this.reader.close();
                } catch (IOException e3) {
                }
            }
        } catch (Throwable th) {
            sendInterruptMarker();
            this.httpResponse.abort();
            try {
                this.reader.close();
            } catch (IOException e4) {
            }
            throw th;
        }
    }
}
